package com.lelebd;

import com.lelebd.bean.LoginEvent;
import com.lelebd.bean.LoginFailWarning;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.net.URL;
import java.util.ArrayList;

/**
 * 检测登录事件中，短时间内连续登录失败N次的用户并且生成预警信息
 */
public class LoginFail {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //1、从正文中读取数据
        URL resource = LoginFail.class.getResource("/LoginLog.csv");
        DataStreamSource<String> loginEventData = env.readTextFile(resource.getPath());
        SingleOutputStreamOperator<LoginEvent> loginEventStream = loginEventData.map(new MapFunction<String, LoginEvent>() {
            @Override
            public LoginEvent map(String value) throws Exception {
                String[] vals = value.split(",");
                //Long userId, String ip, String loginState, Long timestamp
                return new LoginEvent(Long.valueOf(vals[0]), vals[1], vals[2], Long.valueOf(vals[3]));
            }
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(3)) {
            @Override
            public long extractTimestamp(LoginEvent element) {
                return element.getTimestamp() * 1000L;
            }
        });

        //2、自定义出来函数检测连续登录失败事件
        loginEventStream
                .keyBy(LoginEvent::getUserId)
                .process(new LoginFailDetectWarning(2))
                .print("连续登录失败预警信息");
        env.execute("登录风控预警");
    }

    public static class LoginFailDetectWarning extends KeyedProcessFunction<Long, LoginEvent, LoginFailWarning> {
        //定义连续失败的次数
        private Integer maxFailTimes;

        public LoginFailDetectWarning(int maxFailTimes) {
            this.maxFailTimes = maxFailTimes;
        }

        //定义状态：保存当前N秒内所有的登录失败记录
        ListState<LoginEvent> loginFailEventListState = null;
        //定义状态：保存注册的定时器时间戳
        ValueState<Long> timeTsState = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            loginFailEventListState = getRuntimeContext().getListState(new ListStateDescriptor<LoginEvent>("login-fail", LoginEvent.class));
            timeTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer-ts", Long.class));
        }

        @Override
        public void processElement(LoginEvent event, Context ctx, Collector<LoginFailWarning> out) throws Exception {
            //判断当前登录时间的类型
            if ("fail".equals(event.getLoginState())) {
                //如果是失败事件。添加到listState中
                loginFailEventListState.add(event);
                //如果没有定时器，注册一个N秒的定时器
                if (timeTsState.value() == null) {
                    Long ts = (event.getTimestamp() + 2) * 1000L;
                    ctx.timerService().registerEventTimeTimer(ts);
                    timeTsState.update(ts);
                }
            } else {
                //如果登录成功，则删除定时器，清空状态
                if (timeTsState.value() != null) {
                    ctx.timerService().deleteEventTimeTimer(timeTsState.value());
                    loginFailEventListState.clear();
                    timeTsState.clear();
                }
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<LoginFailWarning> out) throws Exception {
            //定时器触发。说明至少是N秒内没有登录成功出来。判断liststate中失败的个数
            ArrayList<LoginEvent> loginEventList = Lists.newArrayList(loginFailEventListState.get());
            Integer failTimes = loginEventList.size();
            if (failTimes >= maxFailTimes) {
                //Long userId, Long firstFailTime, Long lastFailTime, String warningMsg
                out.collect(new LoginFailWarning(ctx.getCurrentKey(),
                        loginEventList.get(0).getTimestamp(),
                        loginEventList.get(loginEventList.size() - 1).getTimestamp(),
                        "login fail in 2s for " + failTimes + " times")
                );
            }

            loginFailEventListState.clear();
            timeTsState.clear();
        }
    }
}
